Skip to content

Conversation

@teocns
Copy link
Contributor

@teocns teocns commented Feb 4, 2025

Capture terminal output to be displayed in the agentops dashboard. Session-based capture functionality is currently under development. Captures will be displayed in the AgentOps dashboard under "Terminal".

At the moment we capture:

  • logging, i.e logging.info(), logging.error(), etc.
  • stdout/stderr, i.e print()

Key changes in this PR:

  1. HTTP Client Refactoring
  • Consolidated HTTP methods (GET, POST, PUT, DELETE) into a single _make_request method
  • Added PUT and DELETE methods in HttpClient
  • Made payload parameter optional
  1. New Logging & Instrumentation Features
  • Added instrumentation.py for OpenTelemetry setup and configuration
  • Added log_capture.py with classes for capturing stdout/stderr and logging
  • Added session-specific logging with ANSI color support
  • Added log export functionality to send logs to AgentOps backend
  1. Session Updates
  • Added logging components to Session class
  • Added log exporter for sending logs to backend
  • Updated session cleanup to handle logging components
  1. Testing

@codecov
Copy link

codecov bot commented Feb 4, 2025

Codecov Report

Attention: Patch coverage is 71.04478% with 97 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
agentops/log_capture.py 70.96% 54 Missing ⚠️
agentops/session.py 57.95% 37 Missing ⚠️
agentops/instrumentation.py 93.02% 3 Missing ⚠️
agentops/http_client.py 86.66% 2 Missing ⚠️
agentops/helpers.py 66.66% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

@teocns teocns requested a review from areibman February 6, 2025 01:57
teocns added 27 commits February 6, 2025 04:11
Rich formatted text with various styles (color, bold, italic)
Direct ANSI codes with proper newlines
Mixed color text in both stdout and stderr
All ANSI codes are preserved in the logged output

Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Added a private _make_request method that handles all the common request
logicSimplified the public HTTP methods (GET/POST/PUT/DELETE) to use the
common handlerEach method only specifies its unique parameters and
passes them to _make_requestAdded DELETE method for
completenessMaintained all existing error handling and response
processingKept the connection pooling and header preparation logic
unchanged

Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
…ed for this session

Signed-off-by: Teo <[email protected]>

logger -> agentops_logger

Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
Signed-off-by: Teo <[email protected]>
@teocns
Copy link
Contributor Author

teocns commented Feb 6, 2025

The current implementation has a potential issue: it doesn't guarantee that logs from one session won't leak into another when running concurrently.

This needs to be addressed with callstack lookup or other methods. @areibman: otherwise, how does agentops actually know which session to assign llm events to? would the same mechanism work?

Comment on lines +61 to +102
def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]:
"""Set up OpenTelemetry logging components for a new session.
This function creates the necessary components to capture and export logs for a specific session:
- A LoggerProvider with session-specific resource attributes
- A BatchLogRecordProcessor to batch and export logs
- A LoggingHandler to capture logs and forward them to the processor
Args:
session_id: Unique identifier for the session, used to tag telemetry data
log_exporter: SessionLogExporter instance that handles sending logs to AgentOps backend
Returns:
Tuple containing:
- LoggingHandler: Handler that should be added to the logger
- BatchLogRecordProcessor: Processor that batches and exports logs
"""
# Create logging components
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)

# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor) # Add processor to provider

from agentops.log_capture import LogCapture

logcap = LogCapture(
session,
)

logcap.start()

# log_handler = LoggingHandler(
# level=logging.INFO,
# logger_provider=logger_provider,
# )
#
# # Register handler with session
# set_session_handler(session_id, log_handler)
#
# return log_handler, log_processor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function setup_session_telemetry no longer returns the promised tuple[LoggingHandler, BatchLogRecordProcessor] as specified in its return type annotation, causing type checking failures and potential runtime errors.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]:
"""Set up OpenTelemetry logging components for a new session.
This function creates the necessary components to capture and export logs for a specific session:
- A LoggerProvider with session-specific resource attributes
- A BatchLogRecordProcessor to batch and export logs
- A LoggingHandler to capture logs and forward them to the processor
Args:
session_id: Unique identifier for the session, used to tag telemetry data
log_exporter: SessionLogExporter instance that handles sending logs to AgentOps backend
Returns:
Tuple containing:
- LoggingHandler: Handler that should be added to the logger
- BatchLogRecordProcessor: Processor that batches and exports logs
"""
# Create logging components
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)
# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor) # Add processor to provider
from agentops.log_capture import LogCapture
logcap = LogCapture(
session,
)
logcap.start()
# log_handler = LoggingHandler(
# level=logging.INFO,
# logger_provider=logger_provider,
# )
#
# # Register handler with session
# set_session_handler(session_id, log_handler)
#
# return log_handler, log_processor
def setup_session_telemetry(session, log_exporter) -> tuple[LoggingHandler, BatchLogRecordProcessor]:
# ...
logcap.start()
return log_handler, log_processor

Comment on lines +79 to +84
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)

# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor) # Add processor to provider

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log_processor and logger_provider are created but never used since the code is commented out, leading to resource leaks and non-functional logging setup.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)
# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor) # Add processor to provider
resource = Resource.create({SERVICE_NAME: f"agentops.session.{session.session_id}"})
logger_provider = LoggerProvider(resource=resource)
# Create processor and handler
log_processor = BatchLogRecordProcessor(log_exporter)
logger_provider.add_log_record_processor(log_processor)

# Setup logger provider with console exporter
resource = Resource.create(resource_attrs)
self._logger_provider = LoggerProvider(resource=resource)
self._logger_provider.add_log_record_processor(BatchLogRecordProcessor(self.session._log_exporter))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accessing self.session._log_exporter directly breaks encapsulation and may fail if the internal implementation changes. Should use a public accessor method instead.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
self._logger_provider.add_log_record_processor(BatchLogRecordProcessor(self.session._log_exporter))
self._logger_provider.add_log_record_processor(BatchLogRecordProcessor(self.session.get_log_exporter()))

Comment on lines 590 to +634
def _reauthorize_jwt(self) -> Union[str, None]:
with self._lock:
payload = {"session_id": self.session_id}
serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8")
res = HttpClient.post(
f"{self.config.endpoint}/v2/reauthorize_jwt",
serialized_payload,
self.config.api_key,
)

logger.debug(res.body)

if res.code != 200:
try:
serialized_payload = safe_serialize(payload).encode("utf-8")
res = HttpClient.post(
f"{self.config.endpoint}/v2/reauthorize_jwt",
serialized_payload,
self.config.api_key,
)
if not res:
return None
jwt = res.body.get("jwt")
self.jwt = jwt
return jwt
except Exception as e:
logger.error(f"Failed to reauthorize JWT: {e}")
return None

jwt = res.body.get("jwt", None)
self.jwt = jwt
return jwt

def _start_session(self):
with self._lock:
payload = {"session": self.__dict__}
serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8")

try:
serialized_payload = safe_serialize(payload).encode("utf-8")
res = HttpClient.post(
f"{self.config.endpoint}/v2/create_session",
serialized_payload,
api_key=self.config.api_key,
parent_key=self.config.parent_key,
)
except ApiServerException as e:
return logger.error(f"Could not start session - {e}")
if not res:
return False
jwt = res.body.get("jwt")
self.jwt = jwt
if jwt is None:
return False

logger.debug(res.body)

if res.code != 200:
return False

jwt = res.body.get("jwt", None)
self.jwt = jwt
if jwt is None:
return False

logger.info(
colored(
f"\x1b[34mSession Replay: {self.session_url}\x1b[0m",
"blue",
logger.info(
colored(
f"\x1b[34mSession Replay: {self.session_url}\x1b[0m",
"blue",
)
)
)

return True
add_session(self)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition risk: add_session() is called after session setup completes, while remove_session() happens first in cleanup. Should be reversed to prevent tracking partially initialized sessions.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def _reauthorize_jwt(self) -> Union[str, None]:
with self._lock:
payload = {"session_id": self.session_id}
serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8")
res = HttpClient.post(
f"{self.config.endpoint}/v2/reauthorize_jwt",
serialized_payload,
self.config.api_key,
)
logger.debug(res.body)
if res.code != 200:
try:
serialized_payload = safe_serialize(payload).encode("utf-8")
res = HttpClient.post(
f"{self.config.endpoint}/v2/reauthorize_jwt",
serialized_payload,
self.config.api_key,
)
if not res:
return None
jwt = res.body.get("jwt")
self.jwt = jwt
return jwt
except Exception as e:
logger.error(f"Failed to reauthorize JWT: {e}")
return None
jwt = res.body.get("jwt", None)
self.jwt = jwt
return jwt
def _start_session(self):
with self._lock:
payload = {"session": self.__dict__}
serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8")
try:
serialized_payload = safe_serialize(payload).encode("utf-8")
res = HttpClient.post(
f"{self.config.endpoint}/v2/create_session",
serialized_payload,
api_key=self.config.api_key,
parent_key=self.config.parent_key,
)
except ApiServerException as e:
return logger.error(f"Could not start session - {e}")
if not res:
return False
jwt = res.body.get("jwt")
self.jwt = jwt
if jwt is None:
return False
logger.debug(res.body)
if res.code != 200:
return False
jwt = res.body.get("jwt", None)
self.jwt = jwt
if jwt is None:
return False
logger.info(
colored(
f"\x1b[34mSession Replay: {self.session_url}\x1b[0m",
"blue",
logger.info(
colored(
f"\x1b[34mSession Replay: {self.session_url}\x1b[0m",
"blue",
)
)
)
return True
add_session(self)
finally:
add_session(self)
...
remove_session(self)

@github-actions
Copy link
Contributor

This pull request has been automatically marked as stale because it has not had any activity in the last 14 days.

If no updates are made within 7 days, this PR will be automatically closed.

@github-actions github-actions bot added the stale label Feb 27, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Mar 6, 2025

This pull request has been automatically closed because it has been stale for 7 days with no activity.

Feel free to reopen this PR if you'd like to continue working on it.

@github-actions github-actions bot closed this Mar 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants